package com.heima.kafka.stream;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * 消息生产者
 */
public class ProducerkfkFastStart {
    private static final String TOPIC = "itcast-topic-input";

    public static void main(String[] args) {
        //添加kafka的配置信息
        Properties properties = new Properties();
        //配置broker信息
        properties.put("bootstrap.servers", "192.168.200.129:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);

        //生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        //不停发送
        while (true) {
            try {
                //封装消息
                ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "k001", "hello shanghai kafka stream");
                //发送消息
                producer.send(record);
                System.out.println("发送消息: " + record.value());

                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
